package pipeline

import (
	"sort"
	"io"
	"encoding/binary"
	"math/rand"
	"log"
	"time"
)

func ArraySource(a ...int ) <-chan int  {
	out := make(chan int)
	go func() {
		for _,v := range a {
			out <- v
		}
		close(out)
	}()
	return out
}
//sort
var startTime time.Time

func Init()  {
	startTime = time.Now()
}
func InMemorySort(in <-chan int )  <- chan int  {
	out := make(chan int,1024)
	go func() {
		//read into memroy
		a := []int {}
		for v := range in {
			a = append(a,v)
		}
		log.Printf("Read Done :%d",time.Now().Sub(startTime))
		sort.Ints(a)
		log.Printf("InMemory Done :%d",time.Now().Sub(startTime))
		//send to channel
		for _,v := range a {
			out <- v
		}
		close(out)
	}()
	return out
}
func Merge(in1,in2 <-chan int ) <- chan int   {
	out :=make(chan int,1024)
	go func() {
		v1,ok1 := <- in1
		v2,ok2 := <- in2
		for ok1 || ok2 {
			if !ok2 ||(ok1 && 	v1 <= v2 ) {
				out <- v1
				v1,ok1 = <- in1
			}else {
				out <- v2
				v2,ok2 = <- in2
			}
		}
		close(out)
		log.Printf("Merge Done :%d",time.Now().Sub(startTime))
	}()
	return out
}
func ReadSource(reader  io.Reader,chunksize int) <-chan int {
	out :=make(chan int,1024)
	go func() {
		buffer :=make([]byte,8)
		bytesRead :=0
		for {
			n,err := reader.Read(buffer)
			if n> 0 {
				v := int(binary.BigEndian.Uint64(buffer))
				bytesRead +=n
				out <- v
			}
			if err != nil ||
				(chunksize !=-1 && bytesRead >= chunksize) {
				break
			}
		}
		close(out)
	}()
	return out
}
func WriteSink(write io.Writer,in <- chan int )  {
	for v:=range in {
		buffer :=make([]byte,8)
		binary.BigEndian.PutUint64(buffer,uint64(v))
		write.Write(buffer)
	}
}
func RandomSource(n int) <-chan int {
	out :=make(chan int,1024 )
	go func() {
		for i :=0;i<n;i++{
			r :=rand.Int()
			//log.Println(r)
			out <- r
		}
		close(out)
	}()
	return out
}
func MergeN(inputs ...<- chan int) <- chan int  {
	if len(inputs) == 1 {
		return inputs[0]
	}
	m := len(inputs)/2
	return Merge(
		MergeN(inputs[:m]...),
		MergeN(inputs[m:]...),
	)
}